Kafak环境搭建

Docker创建kafka数据dev环境

1
2
3
docker run -d --name data-dev --restart always --net=host -e ADV_HOST=172.16.26.193 landoop/fast-data-dev
bin/kafka-topics.sh --create --zookeeper 172.16.26.193:2181 --replication-factor 1 --partitions 5 --topic my-replicated-topic
bin/kafka-topics.sh --alter --zookeeper 127.0.0.1:2181 --partitions 5 --topic nginx_log

下载kafka

http://kafka.apache.org/quickstart

启动zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties >> zookeeper.out 2>&1 &

启动kafka单机

bin/kafka-server-start.sh config/server.properties >>kafka.out 2>&1 &

集群启动

bin/kafka-server-start.sh config/server.properties >>kafka-0.out 2>&1 &
bin/kafka-server-start.sh config/server-1.properties >>kafka-1.out 2>&1 &
bin/kafka-server-start.sh config/server-2.properties >>kafka-2.out 2>&1 &

监控

Kafka三款监控工具比较

Kafka监控工具KafkaOffsetMonitor配置及使用

下载KakfaOffsetMonitor

1
2
3
4
5
java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m  -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--port 8088 \
--zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 \
--refresh 5.minutes \
--retain 1.day >/dev/null 2>&1;

TODO 监控原理

PS: kafka 日志默认保存7天.

topic创建

1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic my-replicated-topic

命令行消费者

1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic nginx_log --from-beginning

ubuntu命令行设置系统代理

增加partition

1
bin/kafka-topics.sh --alter --zookeeper 127.0.0.1:2181 --partitions 10 --topic nginx_log

查看某个topic的 logSize

指的是topic各个分区的logSize

1
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list api.quartz.ren:9092,api.quartz.ren:9093,api.quartz.ren:9094 --topic nginx_log --time -1

time 为-2 表示查看offset的最小值, -1 表示最大值

1
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list api.quartz.ren:9092,api.quartz.ren:9093,api.quartz.ren:9094 -topic nginx_log --time -2

查看消费者组内的offset位置(消费情况)

关于kafka更改消费者对应分组下的offset值

1
2
# To view offsets, as mentioned earlier, we "describe" the consumer group like this:
bin/kafka-consumer-groups.sh --bootstrap-server api.quartz.ren:9092,api.quartz.ren:9093,api.quartz.ren:9094 --group consumer02 --describe

Managing Consumer Groups

Managing Consumer Groups

1
2
3
4
5
6
# 
bin/kafka-consumer-groups.sh --bootstrap-server api.quartz.ren:9092,api.quartz.ren:9093,api.quartz.ren:9094 --list
# provides the list of all active members in the consumer group.
bin/kafka-consumer-groups.sh --bootstrap-server api.quartz.ren:9092,api.quartz.ren:9093,api.quartz.ren:9094 --describe --group consumer02 --members
#
bin/kafka-consumer-groups.sh --bootstrap-server api.quartz.ren:9092,api.quartz.ren:9093,api.quartz.ren:9094 --describe --group consumer02 --state

更改offset

1
2
3
4
# 先查看一下customer的offset状态
bin/kafka-consumer-groups.sh --bootstrap-server api.quartz.ren:9092,api.quartz.ren:9093,api.quartz.ren:9094 --group consumer02 --describe
# reset offsets of a consumer group to the latest offset: (earliest)
bin/kafka-consumer-groups.sh --bootstrap-server api.quartz.ren:9092,api.quartz.ren:9093,api.quartz.ren:9094 --reset-offsets --group consumer02 --topic nginx_log --to-latest

以上reset 只能在 consumer inactive状态时,才可以. 问题: 这个操作的目的和结果是什么???

Kafka auto.offset.reset值详解

1
2
3
4
5
6
earliest 
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常